package com.hanxiaozhang.delayqueue.no3;

import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;

import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static java.util.concurrent.Executors.defaultThreadFactory;

/**
 * 〈一句话功能简述〉<br>
 * 〈Redisson实现延迟队列〉
 *
 * @author hanxinghua
 * @create 2022/9/17
 * @since 1.0.0
 */
@Slf4j
public class RedissonDelayQueue {

    public static void main(String[] args) {

        RedissonDelayQueue delayQueue = new RedissonDelayQueue();
        delayQueue.push(1,120);
        delayQueue.push(2,100);
    }

    private static final String DELAY_QUEUE_NAME = "redisson-delay-queue-test";

    private RDelayedQueue<Long> delayedQueue;

    private RBlockingQueue<Long> blockQueue;

    public RedissonDelayQueue() {
        Config config = new Config();
        config.useSingleServer().setAddress("http://127.0.0.1:6379").setDatabase(0);
        RedissonClient client = Redisson.create(config);
        client.getBlockingQueue(DELAY_QUEUE_NAME);
        this.blockQueue = client.getBlockingQueue(DELAY_QUEUE_NAME);
        this.delayedQueue = client.getDelayedQueue(blockQueue);
        this.loopEvent();
    }


    public void push(long messageId, long delayTime) {
        delayedQueue.offerAsync(messageId, delayTime, TimeUnit.SECONDS);
    }


    public void loopEvent() {
        // 启用一个单线程池
        ExecutorService singleThreadPool = new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024), defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
        singleThreadPool.execute(() -> {
            while (true) {
                try {
                    // 具体处理消息的业务逻辑,take方法会堵塞直到获取到数据
                    handleDelayMessage(blockQueue.take());
                } catch (Exception e) {
                    log.error("消费延迟推送消息异常", e);
                }
            }
        });
    }


    /**
     * 具体业务逻辑
     *
     * @param messageId
     */
    private void handleDelayMessage(Long messageId) {

        System.out.println("具体业务逻辑 :"+messageId);
    }


}
